package thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

/**
 * 基于 {@link #Disruptor} 的单线程队列实现
 * @author King
 *
 */
public class DisruptorSingleProcessor implements IMessageProcessor{

	//65536条消息
	int ringBufferSize = 2<<15;
	
	private WaitStrategy strategy = new BlockingWaitStrategy();
	
	private ExecutorService executor = null;

	private Disruptor<DistriptorHandler> disruptor = null;

	private RingBuffer<DistriptorHandler> buffer = null;
	
	private DistriptorEventFactory eventFactory = new DistriptorEventFactory();
	
	private static final DistriptorEventHandler handler = new DistriptorEventHandler();
	
	
	private AtomicBoolean istop = new AtomicBoolean();
	
	private ThreadGroup tg;
	
	/**线程名字**/
	private String threadName;
	
	/**定时线程池**/
	private static final ScheduledThreadPoolExecutor scheduled  = new ScheduledThreadPoolExecutor(3,new TimerThreadFacotry());
	
	
	public DisruptorSingleProcessor(String tgName)
	{
		tg = new ThreadGroup(tgName);
		this.threadName = tgName;
	}
	
	
	public DisruptorSingleProcessor(ThreadGroup tg,String threadName)
	{
		this.tg = tg;
		this.threadName = threadName;
	}

	
	@SuppressWarnings("unchecked")
	public void start() {
		executor = Executors.newSingleThreadExecutor(new LoopThreadfactory());
		disruptor = new Disruptor<DistriptorHandler>(eventFactory, ringBufferSize, executor, ProducerType.MULTI, strategy);
		buffer = disruptor.getRingBuffer();
		disruptor.handleEventsWith(DisruptorSingleProcessor.handler);
		disruptor.start();
	}
	
	/**定时器线程工厂**/
	private static class TimerThreadFacotry implements ThreadFactory
	{
		private AtomicInteger timeThreadName=new AtomicInteger(0);

		public Thread newThread(Runnable r) {
			Thread thread = new Thread(r,"TimerThread "+timeThreadName.addAndGet(1));
			return thread;
		}
	}
	
	
	/**主线程工厂**/
	private class LoopThreadfactory implements ThreadFactory {

		public Thread newThread(Runnable r) {
			Thread thread = new Thread(tg,r);
			thread.setName(threadName);
			return thread;
		}
	}
	
	public ThreadGroup getTg() {
		return tg;
	}

	
	static int num = 1;
	static long start = System.currentTimeMillis();
	static long lastNum = 0;


	public static ScheduledThreadPoolExecutor getScheduled() {
		return scheduled;
	}



	public void stop() {
		if(istop.get())
			return;
		disruptor.shutdown();
		executor.shutdown();
		if(!scheduled.isShutdown())
			scheduled.shutdown();
		istop.set(true);
	}
	

	public AtomicBoolean getIstop() {
		return istop;
	}
	
	public void put(ITask msg)
	{
//		if(buffer.hasAvailableCapacity(1))
//		{
//			System.out.println("没有容量了");
//		}
		long next = buffer.next();
		DistriptorHandler testEvent = buffer.get(next);
		testEvent.setTask(msg);
		buffer.publish(next);
	}


	public boolean isFull() {
		return !buffer.hasAvailableCapacity(1);
	}



	public void putTimerTask(ITimerTask timerTask) {
		DistriptorTimerTask timeAction = (DistriptorTimerTask)timerTask;
		timeAction.setDisruptorThread(this);
		ScheduledFuture<?> future = scheduled.scheduleAtFixedRate(timeAction,
				timeAction.getIntervalMill(), timeAction.getIntervalMill(),
				TimeUnit.MILLISECONDS);
		timeAction.setFuture(future);
	}
	
}
